2021-02-18

Outline

  • Setup
  • How to profile code
    • Look for “slow” sections
  • Making improvements
  • Single node parallel
    • Multiple cpus, 1 computer
  • Multi node parallel
    • Multiple cpus, multiple computers

Getting setup

  • Slides are available at https://uschpc.github.io/workshop-hpc-python/
  • git clone https://github.com/uschpc/workshop-hpc-python.git
  • Open index.html in browser
  • examples in examples
  • Requirements:
    • python3
    • mpi implementation
      • mpi4py
    • hdf5
      • h5py (MPI parallel enabled)
    • snakeviz
    • viztracer
    • Most available on CARC clusters

Installing Dependencies

snakeviz (Must run this on local computer)

pip install snakeviz

viztracer

pip install viztracer

h5py (optional)

module load usc
module load python
module load hdf5
CC="mpicc" HDF5_MPI="ON" HDF5_DIR=$HDF5_ROOT pip3 install --no-binary=h5py h5py --user

Profiling and benchmarking

  • Aim for fast enough code
  • Compute time is less expensive than human time
  1. Profile code to understand the execution time and memory use of each part
  2. Identify bottlenecks (i.e., parts of code that take the most time)
  3. Try to improve performance of bottlenecks by modifying code
  4. Benchmark alternative code to identify best alternative

Profiling & Benchmarking

A few tools:

  • Profiling
    • cProfile, viztracer to generate profile info
    • snakeviz to display results
    • On CARC systems, download output files to view graphics locally
  • Benchmarking
    • timeit (Run a function repeatedly to get stats)
    • time (Get start and end time)

Example

  • examples/write.py
  • Generate some data, write to .txt file
  • Let’s measure performance for baseline
  • python -m cProfile -o write.log examples/write.py -n 100
    • Write 100 files
    • Generates binary file, need utility to view
    • snakeviz write.log
  • python -m cProfile -s tottime write.py -n 100
    • Generates text output, might need tools to parse

Interpreting results

  • Visualize which parts took the longest
  • Summary table below
  • generate_data called 1000000 times
    • consumed ~2.9s of runtime
  • npyio.py called 100 times
    • consumed ~0.75s of runtime

Interpreting results

This snippet is not great

def write_data(x,y,n,t):
    filename=("output/%s%05d" %(output,i))

    i_max=len(x)
    j_max=len(y)
    data=np.zeros((i_max,j_max))

    for i in range(i_max):
        for j in range(j_max):
            data[i,j]=generate_data(x[i],y[j],nFiles,fileID)

            np.savetxt(filename,data)

General recommendations

  • Code first, optimize later (if needed)
  • Profile code to identify bottlenecks
  • Simplify when possible (do less)
  • Vectorize code
  • Use existing solutions
  • Parallelize when appropriate

Vectorizing code example

  • Packages like numpy often have built-in vectorized versions
  • Apply same operation to each element
  • Often written in C/C++/Fortran
  • Use built in functions when possible

numpy.meshgrid(*xi, copy=True, sparse=False, indexing='xy')

Return coordinate matrices from coordinate vectors. Make N-D coordinate arrays for vectorized evaluations of N-D scalar/vector fields over N-D grids, given one-dimensional coordinate arrays x1, x2,…, xn.

Vectorizing code example

  • Usually if you’re implementing a loop there’s a faster way
def write_data(X,Y,n,t):
    filename=("output/%s%05d" %(output,i))

    Z = generate_data(X,Y,nFiles,i)
    np.savetxt(filename,Z)

Where

x = np.arange(x_origin-size/2,x_origin+size/2,1)
y = np.arange(y_origin-size/2,y_origin+size/2,1)
X,Y = np.meshgrid(x,y)
  • See example under examples/write_vectorized.py

Check performance

python3 -m cProfile -s tottime examples/write_vectorized.py -n 100 write_vectorized > write_vectorized

ncalls tottime percall cumtime percall filename:lineno(function)
100 0.870 0.009 1.145 0.011 npyio.py:1191(savetxt)
200 0.160 0.001 0.162 0.001 {built-in method io.open}
115 0.075 0.001 0.075 0.001 {built-in method io.open_code}
34/32 0.054 0.002 0.058 0.002 {built-in method _imp.create_dynamic}
10000 0.051 0.000 0.051 0.000 {method ‘write’ of ’_io.TextIOWrapper’ objects}
100 0.044 0.000 0.044 0.000 write_vectorized.py:6(generate_data)

Check performance

Original write.py ~4.8s

ncalls tottime percall cumtime percall filename:lineno(function)
1000000 2.854 0.000 2.854 0.000 write.py:6(generate_data)
100 0.749 0.007 1.011 0.010 npyio.py:1191(savetxt)
100 0.673 0.007 4.540 0.045 write.py:11(write_data)

New write_vectorized.py ~ 1.4s

ncalls tottime percall cumtime percall filename:lineno(function)
100 0.870 0.009 1.145 0.011 npyio.py:1191(savetxt)
100 0.044 0.000 0.044 0.000 write_vectorized.py:6(generate_data)
100 0.001 0.000 1.191 0.012 write_vectorized.py:11(write_data)

Read/write performance

  • Our example has the line
    np.savetxt(filename,data)
  • Generate many small files in plain, text
  • Sacrifice readibility for performance?
    np.save(filename,data)

Check performance

write_vectorized.py ~1.4s

ncalls tottime percall cumtime percall filename:lineno(function)
100 0.870 0.009 1.145 0.011 npyio.py:1191(savetxt)
100 0.044 0.000 0.044 0.000 write_vectorized.py:6(generate_data)
100 0.001 0.000 1.191 0.012 write_vectorized.py:11(write_data)

examples/write_vectorized_binary.py ~0.46s

ncalls tottime percall cumtime percall filename:lineno(function)
100 0.174 0.002 0.174 0.002 {built-in method io.open}
100 0.044 0.000 0.044 0.000 write_vectorized_binary.py:6(generate_data)
100 0.001 0.000 0.220 0.002 write_vectorized_binary.py:11(write_data)
100 0.002 0.000 0.265 0.003 npyio.py:457(save)

Check performance

  • Performance is pretty good BUT
  • ~40% of time is WAITING on opening files
  • Keep in mind for later
  • Assume we have fine tuned program
  • Assume it’s worth our time to make it faster
  • What next?

Parallel programming

  • Key concept: speedup (decrease in runtime)
  • Parallel programming is easier with one compute node
    • Using multiple nodes requires more effort
  • “Parallel” can be different things
    • Data vs. task parallelism
    • “Pleasantly” parallel or “Embarassingly” parallel
      • Concurrent execution of different parts of a larger computation
      • “High throughput computing”
    • Implicit vs. explicit parallel programming

Costs of parallelizing

  • Some computations are not worth parallelizing
  • Some costs to parallelizing (overhead):
    • changing code
    • spawning child processes
    • copying data and environment
    • communications
  • Speedup not proportional to number of cores (Amdahl’s law)
  • Optimal number of cores
    • depends on specific computations
    • experiment to find

Hardware configuration

  • Compute nodes have different configurations
    • number of cores
    • amount of memory
  • On CARC systems:

Example Slurm job script for multiple cores

#!/bin/bash

#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=8
#SBATCH --mem=16GB
#SBATCH --time=1:00:00
#SBATCH --account=<account_id>

module purge
module load gcc/8.3.0
module load openblas/0.3.8
module load python/3.7.6

python3 /path/to/script.py

Conflicts with explicit and implicit parallelism

  • Be careful mixing implicit and explicit parallelism
  • Implicit parallel code may use more resources than intended
  • Turn off implicit parallelism with export OMP_NUM_THREADS=1

Python multiprocessing

  • Multiprocessing is a built in package
  • Create pool of processes
  • Typical example, apply a function over multiple inputs
def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))
  • Since we have multiple arguments
with mp.Pool(processes=nWorkers) as pool:
    for i in range(0,nFiles):
        pool.apply(write_data,args=(X,Y,output,nFiles,i,))
  • Also add -w option for nWorkers
  • See example under examples/write_multiprocessing.py

Viztracer Profiling

  • Let’s use Viztracer to profile
  • viztracer --log_multiprocess -o multiprocessing.html examples/write_multiprocessing.py -w 3 -n 100
  • use ‘wasd’ to navigate

Viztracer Profiling

  • For 1 worker, ~0.3 s
ncalls tottime function
100 0.16 save
101 0.1 recv_bytes
100 0.04 generate_data
  • For 3 workers, ~0.3 s
ncalls tottime filename:lineno(function)
203 0.39 enter
103 0.4 recv_bytes
100 0.04 generate_data
100 0.22 save

Multiprocessing

  • Despite more workers, no speedup
  • There is communication overhead
  • This line is problematic:
pool.apply(write_data,args=(X,Y,output,nFiles,i))
  • During communication, variables are “pickled” or serialized
  • We pass in X,Y,output,nFiles even though they never change

Multiprocessing Queues

  • Use queues and custom process class
  • Each worker process initilized with data
  • Main process adds data to queue
  • Worker process get data from queue and does work

Custom process class

You can find this in examples/write_multiprocessing_queue.py
class worker(mp.Process):
    def __init__(self,task_queue,size,output,nFiles,**kwargs):
        super(worker,self).__init__()
        #print("In init(), Process %s starting." %self.pid)
        x_origin=0
        y_origin=500
        x = np.arange(x_origin-size/2,x_origin+size/2,1)
        y = np.arange(y_origin-size/2,y_origin+size/2,1)
        self.X,self.Y = np.meshgrid(x,y)
        self.task_queue=task_queue
        self.nFiles=nFiles
        self.output=output

    def run(self):

    print("Starting Process:%d " % self.pid)
    time.sleep(1)
    while True:
        try:
            #print("Getting work")
            i = self.task_queue.get(timeout=1)
        except q.Empty:
            print("No more work to do")
            #self.terminate()
            break

        elapsed=write_data(self.X,self.Y,self.output,self.nFiles,i)
        #print("%s is SOO busy with %d" % (self.pid,i) )
        self.task_queue.task_done()
    return

Multiprocessing queue benchmarks

  • This “works” but scales badly
    • Dashed line represents linear scaling
  • Increasing the problem size helps a little
    • Start up overhead vs overall problem size decreases
  • Consider complexity code vs scalability. Was it worth it?

mpi4py

  • mpi4py allows you to add mpi functionality to your scripts
  • Send messages across compute nodes
  • Easier to scale up
    • Assuming it’s worth communication overhead
  • Easier to read?

mpi4py example

  • find under examples/write_mpi.py

print("My names is rank %d"%rank+  " and I'm starting...")
    if rank == 0:
        data = np.arange(nFiles,dtype='i')
        recipient=1
        for chunk in np.array_split(data,n_chunks):
            recipient=(recipient+1)%world_size
            if recipient==0:
                for i in chunk:
                    write_data(X,Y,output,nFiles,i)
            else:
                comm.Send([chunk,MPI.INT], dest=recipient, tag=77)
    else:
        data=np.empty(int(nFiles/n_chunks),dtype='i')
        comm.Recv([data,MPI.INT],source=0, tag=77)
        data_string=np.array2string(data)
        for i in data:
            write_data(X,Y,output,nFiles,i)

mpi4py example






  • Pros
    • Just by using comm.Send and com.Recv we can create our own work load manager
    • We get to use cProflie and pdb again
    • Code is easier to understand
    • Easier to scale up
  • Cons
    • Basic workload distrubtion
    • To debug/profile we need to look at N files/processes
    • Not that much faster
    • MPI communication slower than multiprocessing queues

MPI Scatter

  • mpi4py has a few convenience functions for common tasks
  • comm.Scatter will share data to every worker
  • Check for pre-built solutions
print("My names is rank %d"%rank+  " and I'm starting...")
chunks=None
if rank == 0:
    # Send pieces of data from rank 0 to whole world
    data   = np.arange(nFiles,dtype='i')
    chunks = data.reshape((n_chunks,int(nFiles/n_chunks)))

recvbuf = np.empty(int(nFiles/n_chunks),dtype='i')

comm.Scatter(chunks,recvbuf,root=0)

for i in recvbuf:
    #print("writing ...", i)
    write_data(X,Y,output,nFiles,i)

Parallel IO

  • In some cases file operations are a bottleneck
  • We saw in one example ~ 40% of time was waiting to open a file
  • It’s best to write one large file
  • Libraries like hdf5, h5py allow multiple processes to write to same file
  • Parallelization not needed for speedup

h5py

  • In examples/write_vectorized_binary.py we can modifying write_data
  • See examples/write_hdf5.py
def write_data(X,Y,output,nFiles,i,hf):

    filename=("output/%s%05d" %(output,i))
    Z = generate_data(X,Y,nFiles,i)

    hf.create_dataset(filename,data=Z)
  • Where hf = h5py.File('output/data.h5', 'w')

h5py

examples/write_vectorized_binary.py ~5.46s (1000 writes)

ncalls tottime percall cumtime percall filename:lineno(function)
1000 1.098 0.001 1.098 0.001 {built-in method io.open}
1000 0.860 0.001 0.860 0.001 write_vectorized_binary.py:6(generate_data)
1000 0.517 0.001 0.518 0.001 {method ‘tofile’ of ‘numpy.ndarray’ objects}
637 0.451 0.001 0.451 0.001 {built-in method posix.stat}

examples/write_hdf5.py ~2.54s (1000 writes)

ncalls tottime percall cumtime percall filename:lineno(function)
1000 0.873 0.001 0.873 0.001 write_hdf5.py:7(generate_data)
778 0.235 0.000 0.235 0.000 {built-in method posix.stat}
1000 0.211 0.000 0.242 0.000 dataset.py:38(make_new_dset)

Parallel IO with h5py

  • See examples/write_final.py
  • set hf=h5py.File('output/data.hdf5','w',driver='mpio',comm=comm)
  • Use same write_data function as single core version
  • Reading data is a bit harder
    • Use filename ‘data00123’ as ‘key’ h5py will return dataset

Example Slurm job script for MPI job

#!/bin/bash

#SBATCH --ntasks=16
#SBATCH --cpus-per-task=1
#SBATCH --mem-per-cpu=3GB
#SBATCH --time=1:00:00
#SBATCH --account=<account_id>

module purge
module load gcc/8.3.0
module load openblas/0.3.8
module load openmpi/4.0.2
module load pmix/3.1.3
module load python/3.7.6

srun --mpi=pmix_v2 python3 /path/to/script.py

Additional resources

Thanks!